package com.chatplus.application.scheduled;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.chatplus.application.common.logging.SouthernQuietLogger;
import com.chatplus.application.common.logging.SouthernQuietLoggerFactory;
import com.chatplus.application.domain.entity.framework.PendingNotificationEntity;
import com.chatplus.application.service.framework.PendingNotificationService;
import com.chatplus.application.web.notification.NotificationPublisher;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.List;

/**
 * 队列消息错误重试记录调度器实现
 */
@Component
public class PendingNotificationJob {

    private static final SouthernQuietLogger LOGGER = SouthernQuietLoggerFactory.getLogger(PendingNotificationJob.class);


    private final PendingNotificationService pendingNotificationService;
    private final NotificationPublisher notificationPublisher;

    public PendingNotificationJob(PendingNotificationService pendingNotificationService, NotificationPublisher notificationPublisher) {
        this.pendingNotificationService = pendingNotificationService;
        this.notificationPublisher = notificationPublisher;
    }

    @XxlJob("PendingNotificationJob#resendNotification")
    public void resendNotification() {
        PageDTO<PendingNotificationEntity> pageDto = new PageDTO<>(0, 10000);
        //查询2秒前入库的
        Instant twoSecondAgo = Instant.now().minusSeconds(2);
        Wrapper<PendingNotificationEntity> wrapper = Wrappers.<PendingNotificationEntity>lambdaQuery()
                .lt(PendingNotificationEntity::getCreatedAt, twoSecondAgo)
                .orderByDesc(PendingNotificationEntity::getId);
        List<PendingNotificationEntity> pendingNotificationEntities = pendingNotificationService
                .page(pageDto, wrapper)
                .getRecords();
        pendingNotificationEntities.forEach(this::resendNotification);
    }

    private void resendNotification(PendingNotificationEntity notification) {
        String streamKey = notification.getStreamKey();
        String recordId = notification.getMessageId();
        JSONObject message = JSONUtil.parseObj(notification.getBody());
        notificationPublisher.publish(message.get("payload"), streamKey, recordId);
        LOGGER.message("重试发布通知")
                .context("streamKey", streamKey)
                .context("recordId", recordId)
                .context("当前发送次数", notification.getRetryNum())
                .info();
    }

}
